End-to-End Spelling Correction System: From Data Ingestion to Cloud Deployment
Aspiring SDE, AIML Intern
Final Semester – B.Tech (Information Technology), CGPA 8.0/10
Muffakham Jah College of Engineering & Technology
Welcome to the comprehensive documentation of SpellSeqAI, an end-to-end MLOps project designed for spelling orthographic correction automation. This repository serves as a demonstration of a complete MLOps solution that streamlines and enhances the development, deployment, and maintenance of machine learning models dedicated to spelling correction.
MLOps represents the fusion of machine learning processes with DevOps principles, delivering a framework that ensures repeatability, scalability, and full automation throughout the entire model lifecycle - from initial data preprocessing to production deployment and continuous monitoring.
Create a production-ready spelling correction system that demonstrates best practices in MLOps, from data preprocessing to model deployment, ensuring seamless collaboration between data science and operations teams.
Python 3.8
BERT/NLP
Docker
AWS Cloud
MLflow
DVC
Flask
Grafana
Spello
Keras
# src/components/data_ingestion.py
import os
import pandas as pd
from pathlib import Path
from src.utils.common import create_directories, get_size
from src.entity.config_entity import DataIngestionConfig
from src import logger
class DataIngestion:
def __init__(self, config: DataIngestionConfig):
self.config = config
def download_data(self):
"""Download dataset from Kaggle"""
try:
dataset_url = self.config.source_URL
zip_download_dir = self.config.local_data_file
os.makedirs("artifacts/data_ingestion", exist_ok=True)
# Download using kaggle API
os.system(f"kaggle datasets download -d {dataset_url} -p {zip_download_dir}")
logger.info(f"Dataset downloaded to {zip_download_dir}")
except Exception as e:
logger.error(f"Error downloading data: {e}")
raise e
def extract_zip_file(self):
"""Extract the downloaded zip file"""
try:
unzip_path = self.config.unzip_dir
os.makedirs(unzip_path, exist_ok=True)
with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
zip_ref.extractall(unzip_path)
logger.info(f"Extracted zip file to {unzip_path}")
except Exception as e:
logger.error(f"Error extracting zip file: {e}")
raise e
def validate_data_schema(self):
"""Validate the ingested data schema"""
try:
data_path = os.path.join(self.config.unzip_dir, "spelling_correction.csv")
df = pd.read_csv(data_path)
# Schema validation
expected_columns = ['wrong', 'right']
assert all(col in df.columns for col in expected_columns), "Schema validation failed"
# Quality checks
assert df.isnull().sum().sum() < len(df) * 0.1, "Too many null values"
assert len(df) > 1000, "Dataset too small"
logger.info("Data schema validation passed")
return True
except Exception as e:
logger.error(f"Schema validation failed: {e}")
raise e
Column structure, data types, format consistency
Null values, outliers, data completeness
Data consistency, duplicate detection
Data Version Control (DVC) ensures reproducible ML workflows by tracking data, models, and experiments alongside Git for complete project versioning.
# dvc.yaml - Enhanced Pipeline Configuration
stages:
data_ingestion:
cmd: python src/pipeline/stage_01_data_ingestion.py
deps:
- src/pipeline/stage_01_data_ingestion.py
- config/config.yaml
outs:
- artifacts/data_ingestion/spelling_correction.csv
data_validation:
cmd: python src/pipeline/stage_02_data_validation.py
deps:
- src/pipeline/stage_02_data_validation.py
- config/config.yaml
- artifacts/data_ingestion/spelling_correction.csv
outs:
- artifacts/data_validation/status.txt
metrics:
- artifacts/data_validation/validation_metrics.json
data_transformation:
cmd: python src/pipeline/stage_03_data_transformation.py
deps:
- src/pipeline/stage_03_data_transformation.py
- config/config.yaml
- artifacts/data_ingestion/spelling_correction.csv
- artifacts/data_validation/status.txt
outs:
- artifacts/data_transformation/train.csv
- artifacts/data_transformation/test.csv
- artifacts/data_transformation/preprocessor.pkl
model_trainer:
cmd: python src/pipeline/stage_04_model_trainer.py
deps:
- src/pipeline/stage_04_model_trainer.py
- config/config.yaml
- artifacts/data_transformation/train.csv
- artifacts/data_transformation/test.csv
- artifacts/data_transformation/preprocessor.pkl
outs:
- artifacts/model_trainer/bert_spell_corrector.h5
- artifacts/model_trainer/tokenizer/
metrics:
- artifacts/model_trainer/metrics.json
plots:
- artifacts/model_trainer/training_history.json
model_evaluation:
cmd: python src/pipeline/stage_05_model_evaluation.py
deps:
- src/pipeline/stage_05_model_evaluation.py
- config/config.yaml
- artifacts/model_trainer/bert_spell_corrector.h5
- artifacts/data_transformation/test.csv
metrics:
- artifacts/model_evaluation/evaluation_metrics.json
plots:
- artifacts/model_evaluation/confusion_matrix.json
plots:
- artifacts/model_trainer/training_history.json:
x: epoch
y:
- accuracy
- val_accuracy
- loss
- val_loss
- artifacts/model_evaluation/confusion_matrix.json:
template: confusion
x: actual
y: predicted
# .dvc/config - AWS S3 Remote Storage
[core]
remote = myremote
autostage = true
['remote "myremote"']
url = s3://spellseqai-dvc-storage/data
region = us-east-1
profile = default
# Setup Commands
$ dvc init
$ dvc remote add -d myremote s3://spellseqai-dvc-storage/data
$ dvc add artifacts/data_ingestion/spelling_correction.csv
$ dvc push
# Pipeline Execution
$ dvc repro # Run entire pipeline
$ dvc dag # Visualize pipeline dependencies
$ dvc plots show # Display training plots
Ready-to-use spell correction library with pre-trained models for rapid deployment
Bespoke training implementation allowing fine-tuning on domain-specific datasets
# src/utils/trainer.py - Enhanced Custom BERT Trainer
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertForMaskedLM, BertConfig
from transformers import AdamW, get_linear_schedule_with_warmup
from torch.utils.data import DataLoader, Dataset
import numpy as np
from tqdm import tqdm
import logging
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
class SpellCorrectionDataset(Dataset):
def __init__(self, texts, corrected_texts, tokenizer, max_length=128):
self.texts = texts
self.corrected_texts = corrected_texts
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = str(self.texts[idx])
corrected = str(self.corrected_texts[idx])
# Tokenize input and target
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
target_encoding = self.tokenizer(
corrected,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': target_encoding['input_ids'].flatten()
}
class EnhancedBERTSpellCorrector(nn.Module):
def __init__(self, model_name='bert-base-uncased', num_labels=None):
super(EnhancedBERTSpellCorrector, self).__init__()
self.bert = BertForMaskedLM.from_pretrained(model_name)
self.dropout = nn.Dropout(0.3)
self.classifier = nn.Linear(self.bert.config.hidden_size, self.bert.config.vocab_size)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.bert(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
return outputs
class CustomBERTTrainer:
def __init__(self, config):
self.config = config
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.logger = logging.getLogger(__name__)
# Initialize tokenizer and model
self.tokenizer = BertTokenizer.from_pretrained(config.model_name)
self.model = EnhancedBERTSpellCorrector(config.model_name)
self.model.to(self.device)
# Training parameters
self.learning_rate = config.learning_rate
self.batch_size = config.batch_size
self.epochs = config.epochs
self.warmup_steps = config.warmup_steps
def prepare_data_loaders(self, train_texts, train_labels, val_texts, val_labels):
"""Prepare training and validation data loaders"""
train_dataset = SpellCorrectionDataset(
train_texts, train_labels, self.tokenizer, self.config.max_length
)
val_dataset = SpellCorrectionDataset(
val_texts, val_labels, self.tokenizer, self.config.max_length
)
train_loader = DataLoader(
train_dataset, batch_size=self.batch_size, shuffle=True
)
val_loader = DataLoader(
val_dataset, batch_size=self.batch_size, shuffle=False
)
return train_loader, val_loader
def setup_optimizer_and_scheduler(self, train_loader):
"""Setup optimizer and learning rate scheduler"""
optimizer = AdamW(
self.model.parameters(),
lr=self.learning_rate,
weight_decay=0.01
)
total_steps = len(train_loader) * self.epochs
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=self.warmup_steps,
num_training_steps=total_steps
)
return optimizer, scheduler
def custom_loss_function(self, predictions, targets, attention_mask):
"""Enhanced loss function with attention masking"""
loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
# Reshape predictions and targets
predictions = predictions.view(-1, predictions.size(-1))
targets = targets.view(-1)
# Apply attention mask
masked_predictions = predictions[attention_mask.view(-1) == 1]
masked_targets = targets[attention_mask.view(-1) == 1]
loss = loss_fct(masked_predictions, masked_targets)
return loss
def train_epoch(self, train_loader, optimizer, scheduler):
"""Train for one epoch"""
self.model.train()
total_loss = 0
progress_bar = tqdm(train_loader, desc="Training")
for batch in progress_bar:
# Move batch to device
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['labels'].to(self.device)
# Forward pass
optimizer.zero_grad()
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
loss = outputs.loss
# Backward pass
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
scheduler.step()
total_loss += loss.item()
progress_bar.set_postfix({'loss': loss.item()})
return total_loss / len(train_loader)
def evaluate(self, val_loader):
"""Evaluate model on validation set"""
self.model.eval()
total_loss = 0
all_predictions = []
all_targets = []
with torch.no_grad():
for batch in tqdm(val_loader, desc="Evaluating"):
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['labels'].to(self.device)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
loss = outputs.loss
total_loss += loss.item()
# Collect predictions for metrics calculation
predictions = torch.argmax(outputs.logits, dim=-1)
all_predictions.extend(predictions.cpu().numpy())
all_targets.extend(labels.cpu().numpy())
# Calculate metrics
avg_loss = total_loss / len(val_loader)
return avg_loss, all_predictions, all_targets
def train(self, train_texts, train_labels, val_texts, val_labels):
"""Main training loop"""
self.logger.info("Starting enhanced BERT training...")
# Prepare data loaders
train_loader, val_loader = self.prepare_data_loaders(
train_texts, train_labels, val_texts, val_labels
)
# Setup optimizer and scheduler
optimizer, scheduler = self.setup_optimizer_and_scheduler(train_loader)
# Training history
history = {
'train_loss': [],
'val_loss': [],
'learning_rate': []
}
best_val_loss = float('inf')
for epoch in range(self.epochs):
self.logger.info(f"Epoch {epoch + 1}/{self.epochs}")
# Train
train_loss = self.train_epoch(train_loader, optimizer, scheduler)
# Evaluate
val_loss, predictions, targets = self.evaluate(val_loader)
# Update history
history['train_loss'].append(train_loss)
history['val_loss'].append(val_loss)
history['learning_rate'].append(scheduler.get_last_lr()[0])
self.logger.info(f"Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
# Save best model
if val_loss < best_val_loss:
best_val_loss = val_loss
self.save_model(self.config.model_dir)
self.logger.info("New best model saved!")
return history
def save_model(self, model_path):
"""Save trained model and tokenizer"""
import os
os.makedirs(model_path, exist_ok=True)
# Save model
torch.save(self.model.state_dict(), os.path.join(model_path, 'model.pth'))
# Save tokenizer
self.tokenizer.save_pretrained(os.path.join(model_path, 'tokenizer'))
self.logger.info(f"Model saved to {model_path}")
def predict(self, text):
"""Predict spelling correction for input text"""
self.model.eval()
# Tokenize input
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.config.max_length,
return_tensors='pt'
)
input_ids = encoding['input_ids'].to(self.device)
attention_mask = encoding['attention_mask'].to(self.device)
with torch.no_grad():
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
predictions = torch.argmax(outputs.logits, dim=-1)
# Decode predictions
corrected_text = self.tokenizer.decode(predictions[0], skip_special_tokens=True)
return corrected_text
# src/components/model_trainer.py - Enhanced Model Training Component
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import joblib
import mlflow
import mlflow.keras
from src.utils.trainer import CustomBERTTrainer
from src.entity.config_entity import ModelTrainerConfig
from src import logger
class ModelTrainer:
def __init__(self, config: ModelTrainerConfig):
self.config = config
def load_and_preprocess_data(self):
"""Load and preprocess training data"""
try:
# Load processed data
train_data = pd.read_csv(self.config.train_data_path)
test_data = pd.read_csv(self.config.test_data_path)
# Prepare training data
X_train = train_data['wrong'].values
y_train = train_data['right'].values
X_test = test_data['wrong'].values
y_test = test_data['right'].values
# Additional validation split
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.2, random_state=42
)
logger.info(f"Training data shape: {X_train.shape}")
logger.info(f"Validation data shape: {X_val.shape}")
logger.info(f"Test data shape: {X_test.shape}")
return X_train, X_val, X_test, y_train, y_val, y_test
except Exception as e:
logger.error(f"Error loading data: {e}")
raise e
def train_dual_models(self):
"""Train both Spello and Custom BERT models"""
try:
# Start MLflow run
with mlflow.start_run(run_name="SpellSeqAI_Dual_Training"):
# Load data
X_train, X_val, X_test, y_train, y_val, y_test = self.load_and_preprocess_data()
# Log parameters
mlflow.log_param("model_architecture", "Dual_Spello_BERT")
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
mlflow.log_param("test_size", len(X_test))
# Train Spello Model
logger.info("Training Spello model...")
spello_metrics = self.train_spello_model(X_train, y_train, X_val, y_val)
# Train Custom BERT Model
logger.info("Training Custom BERT model...")
bert_metrics = self.train_bert_model(X_train, y_train, X_val, y_val)
# Compare and select best model
best_model_info = self.compare_models(spello_metrics, bert_metrics)
# Final evaluation on test set
test_metrics = self.evaluate_on_test_set(X_test, y_test, best_model_info)
# Log final metrics
mlflow.log_metrics(test_metrics)
# Save model artifacts
self.save_model_artifacts(best_model_info)
logger.info("Dual model training completed successfully!")
return best_model_info, test_metrics
except Exception as e:
logger.error(f"Error in dual model training: {e}")
raise e
def train_spello_model(self, X_train, y_train, X_val, y_val):
"""Train Spello model"""
try:
from spello.model import SpellCorrectionModel
# Initialize Spello model
sp = SpellCorrectionModel(language='en')
# Prepare training data for Spello
train_corpus = []
for wrong, right in zip(X_train, y_train):
train_corpus.extend([wrong, right])
# Train model
sp.train(train_corpus)
# Validate model
predictions = []
for text in X_val:
try:
corrected = sp.spell_correct(text)
predictions.append(corrected['spell_corrected_text'])
except:
predictions.append(text)
# Calculate metrics
accuracy = accuracy_score(y_val, predictions)
# MLflow logging
mlflow.log_metric("spello_validation_accuracy", accuracy)
# Save model
spello_model_path = os.path.join(self.config.root_dir, "spello_model.pkl")
sp.save(spello_model_path)
metrics = {
'model_type': 'spello',
'accuracy': accuracy,
'model_path': spello_model_path
}
logger.info(f"Spello model accuracy: {accuracy:.4f}")
return metrics
except Exception as e:
logger.error(f"Error training Spello model: {e}")
raise e
def train_bert_model(self, X_train, y_train, X_val, y_val):
"""Train Custom BERT model"""
try:
# Initialize custom trainer
trainer_config = self.config.bert_config
trainer = CustomBERTTrainer(trainer_config)
# Train model
history = trainer.train(X_train, y_train, X_val, y_val)
# Calculate final accuracy
val_predictions = []
for text in X_val:
corrected = trainer.predict(text)
val_predictions.append(corrected)
accuracy = accuracy_score(y_val, val_predictions)
# MLflow logging
mlflow.log_metric("bert_validation_accuracy", accuracy)
mlflow.log_param("bert_learning_rate", trainer_config.learning_rate)
mlflow.log_param("bert_batch_size", trainer_config.batch_size)
mlflow.log_param("bert_epochs", trainer_config.epochs)
# Log training history
for epoch, loss in enumerate(history['train_loss']):
mlflow.log_metric("bert_train_loss", loss, step=epoch)
mlflow.log_metric("bert_val_loss", history['val_loss'][epoch], step=epoch)
metrics = {
'model_type': 'bert',
'accuracy': accuracy,
'model_path': trainer_config.model_dir,
'history': history
}
logger.info(f"BERT model accuracy: {accuracy:.4f}")
return metrics
except Exception as e:
logger.error(f"Error training BERT model: {e}")
raise e
def compare_models(self, spello_metrics, bert_metrics):
"""Compare models and select the best one"""
try:
spello_acc = spello_metrics['accuracy']
bert_acc = bert_metrics['accuracy']
if bert_acc > spello_acc:
best_model = bert_metrics
logger.info(f"BERT model selected (Accuracy: {bert_acc:.4f} vs {spello_acc:.4f})")
else:
best_model = spello_metrics
logger.info(f"Spello model selected (Accuracy: {spello_acc:.4f} vs {bert_acc:.4f})")
# Log model selection
mlflow.log_param("selected_model", best_model['model_type'])
mlflow.log_metric("best_model_accuracy", best_model['accuracy'])
return best_model
except Exception as e:
logger.error(f"Error comparing models: {e}")
raise e
def evaluate_on_test_set(self, X_test, y_test, best_model_info):
"""Evaluate best model on test set"""
try:
if best_model_info['model_type'] == 'spello':
from spello.model import SpellCorrectionModel
sp = SpellCorrectionModel()
sp.load(best_model_info['model_path'])
predictions = []
for text in X_test:
try:
corrected = sp.spell_correct(text)
predictions.append(corrected['spell_corrected_text'])
except:
predictions.append(text)
else: # BERT model
trainer_config = self.config.bert_config
trainer = CustomBERTTrainer(trainer_config)
# Load trained model
trainer.model.load_state_dict(
torch.load(os.path.join(best_model_info['model_path'], 'model.pth'))
)
predictions = []
for text in X_test:
corrected = trainer.predict(text)
predictions.append(corrected)
# Calculate test metrics
test_accuracy = accuracy_score(y_test, predictions)
classification_rep = classification_report(y_test, predictions, output_dict=True)
test_metrics = {
'test_accuracy': test_accuracy,
'test_precision': classification_rep['weighted avg']['precision'],
'test_recall': classification_rep['weighted avg']['recall'],
'test_f1_score': classification_rep['weighted avg']['f1-score']
}
logger.info(f"Test accuracy: {test_accuracy:.4f}")
return test_metrics
except Exception as e:
logger.error(f"Error evaluating on test set: {e}")
raise e
def save_model_artifacts(self, best_model_info):
"""Save model artifacts and metadata"""
try:
# Save model metadata
metadata = {
'model_type': best_model_info['model_type'],
'model_path': best_model_info['model_path'],
'accuracy': best_model_info['accuracy'],
'training_timestamp': pd.Timestamp.now().isoformat()
}
metadata_path = os.path.join(self.config.root_dir, "model_metadata.json")
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=4)
# Register model with MLflow
if best_model_info['model_type'] == 'bert':
mlflow.pytorch.log_model(
best_model_info['model_path'],
"spell_correction_model",
registered_model_name="SpellSeqAI_BERT"
)
logger.info("Model artifacts saved successfully!")
except Exception as e:
logger.error(f"Error saving model artifacts: {e}")
raise e
# Dockerfile - Multi-stage Production Build
# Stage 1: Build dependencies
FROM python:3.8-slim as builder
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir --user -r requirements.txt
# Stage 2: Production image
FROM python:3.8-slim
WORKDIR /app
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy Python packages from builder stage
COPY --from=builder /root/.local /root/.local
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash spellseqai
RUN chown -R spellseqai:spellseqai /app
USER spellseqai
# Set environment variables
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONPATH=/app
ENV FLASK_APP=app.py
ENV FLASK_ENV=production
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Run application
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "4", "--timeout", "120", "app:app"]
Optimized image size with separate build and runtime stages
Identical runtime across development, staging, and production
Minimal resource footprint with efficient dependency management
# .github/workflows/mlops-pipeline.yml
name: SpellSeqAI MLOps Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
AWS_REGION: us-east-1
ECR_REPOSITORY: spellseqai
ECS_SERVICE: spellseqai-service
ECS_CLUSTER: spellseqai-cluster
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run tests
run: |
pytest tests/ --cov=src --cov-report=xml --cov-report=html
- name: Run linting
run: |
flake8 src/
black --check src/
isort --check-only src/
- name: Security scan
run: |
bandit -r src/
safety check
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build-and-deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build, tag, and push image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
echo "::set-output name=image::$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG"
- name: Deploy to Amazon ECS
uses: aws-actions/amazon-ecs-deploy-task-definition@v1
with:
task-definition: task-definition.json
service: ${{ env.ECS_SERVICE }}
cluster: ${{ env.ECS_CLUSTER }}
wait-for-service-stability: true
- name: Run integration tests
run: |
# Wait for deployment
sleep 60
# Run integration tests against deployed service
pytest tests/integration/ --endpoint=${{ env.DEPLOYMENT_URL }}
- name: Rollback on failure
if: failure()
run: |
aws ecs update-service --cluster ${{ env.ECS_CLUSTER }} \
--service ${{ env.ECS_SERVICE }} --force-new-deployment
notification:
needs: [test, build-and-deploy]
runs-on: ubuntu-latest
if: always()
steps:
- name: Notify deployment status
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
Dynamic scaling based on CPU and memory utilization
High availability with traffic distribution
IAM role-based access and SSL encryption
Professional domain with CloudFront CDN
# monitoring/grafana-dashboard.json - Grafana Dashboard Configuration
{
"dashboard": {
"id": null,
"title": "SpellSeqAI MLOps Dashboard",
"tags": ["mlops", "spellseqai"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "Model Accuracy Over Time",
"type": "graph",
"targets": [
{
"expr": "model_accuracy_score",
"legendFormat": "Accuracy",
"refId": "A"
}
],
"yAxes": [
{
"min": 0,
"max": 1,
"unit": "percentunit"
}
]
},
{
"id": 2,
"title": "Prediction Latency",
"type": "stat",
"targets": [
{
"expr": "avg(prediction_duration_seconds)",
"legendFormat": "Avg Latency",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 0.5},
{"color": "red", "value": 1.0}
]
}
}
}
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m])",
"legendFormat": "Error Rate",
"refId": "C"
}
]
},
{
"id": 4,
"title": "Active Users",
"type": "stat",
"targets": [
{
"expr": "sum(active_users)",
"legendFormat": "Active Users",
"refId": "D"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "30s"
}
}
# AWS CloudWatch Custom Metrics
import boto3
import time
from datetime import datetime
class CloudWatchMetrics:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def put_custom_metric(self, metric_name, value, unit='Count'):
"""Send custom metric to CloudWatch"""
try:
self.cloudwatch.put_metric_data(
Namespace='SpellSeqAI/Application',
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
print(f"Error sending metric {metric_name}: {e}")
def log_prediction_metrics(self, accuracy, latency, error_count=0):
"""Log prediction-related metrics"""
self.put_custom_metric('ModelAccuracy', accuracy, 'Percent')
self.put_custom_metric('PredictionLatency', latency, 'Seconds')
self.put_custom_metric('PredictionErrors', error_count, 'Count')
def log_user_metrics(self, active_users, total_requests):
"""Log user interaction metrics"""
self.put_custom_metric('ActiveUsers', active_users, 'Count')
self.put_custom_metric('TotalRequests', total_requests, 'Count')
# Usage in Flask app
from flask import Flask, request, jsonify
import time
app = Flask(__name__)
metrics = CloudWatchMetrics()
@app.route('/predict', methods=['POST'])
def predict_spelling():
start_time = time.time()
try:
# Get input text
data = request.json
text = data.get('text', '')
# Make prediction (your model logic here)
corrected_text = spell_correction_model.predict(text)
# Calculate metrics
latency = time.time() - start_time
# Log metrics
metrics.log_prediction_metrics(
accuracy=0.94, # Your model's accuracy
latency=latency,
error_count=0
)
return jsonify({
'original': text,
'corrected': corrected_text,
'latency': latency
})
except Exception as e:
# Log error
metrics.log_prediction_metrics(
accuracy=0,
latency=time.time() - start_time,
error_count=1
)
return jsonify({'error': str(e)}), 500
The SpellSeqAI project represents a comprehensive integration of machine learning, software engineering, and MLOps best practices. This end-to-end solution demonstrates proficiency in modern ML infrastructure, from data ingestion and version control to production deployment and continuous monitoring.